!jupyter trust 34_Database_Cloud_and_Streaming_Tutorial.ipynb
Active code page: 437 Signing notebook: 34_Database_Cloud_and_Streaming_Tutorial.ipynb
from IPython.display import Image
This Tutorial is part of Stevens Data Cloud Project (SDC).
We will introduce following Database with instructions and configuration
| Type | Name | Features | Platform |
|---|---|---|---|
| Relational Database | PostgreSQL | Relation Database | PC/AWS/Cloud Platform |
| Non-Relational Database | MongoDB | Column Database | PC/AWS/Cloud Platform |
| Non-Relational Database | Cassandra | NoSQL | PC/AWS/Cloud Platform |
| Public MiddleWare | Kafka | Message Queue | PC/AWS/Cloud Platform |
| Private MiddleWare | Kinesis | Message Queue | AWS ONLY |
Here is tutorial list. You can click each item and get to that section directly
[EASY][RECOMMEND][NORMAL][HARD]We list a lot of URL which maybe useful when you start using this tutorial.
PostgreSQL is open source relation database. It can be install in any platform. We will introduce installation on different platform: local windows, linux
Lastest PostgreSQL is 12, but we recommend old 10.14 version. Sometimes old version is more stable.
Here is download link:https://www.enterprisedb.com/downloads/postgres-postgresql-downloads
Image(filename='04_image/101.png')
#PostgreSQL use server-client system. You need to install all components when setup
Image(filename='04_image/102.png')
#PostgreSQL need password to access database when you login. REMEMBER PASSWORD YOU INPUT HERE!!!
#Do not change port number(5432) when you install.
Image(filename='04_image/103.png')
#Next step you maybe need drivers for PostgreSQL. Select local server, second option, in here.
#If you don't need drivers to connect PostgreSQL, you can click cancel to quit.
Image(filename='04_image/104.png')
# After install successfully, you can find shortcut in you windows start menu.
# Postgresql 10 use server-client model.
# If you want to start Postgresql, you need start Postgresql server first,
# which will be "pgAdmin 4" in screenshot. This is only server, we still need use brower as client to access
# You also can find dictionary in "~program/PostgreSQL/10/pgAdmin 4/bin/pgAdmin4.exe"
Image(filename='04_image/105.png')
#if you facing issues about starting sever, you should check "Task Manager" to check status of "postgresql-x64-10"
#Status should be "Runing" to make sure postgresql server working
Image(filename='04_image/106.png')
#After click "pgAdmin 4" you will see a new web application from your brower
#Input password in pop-up windows.
Image(filename='04_image/107.png')
#if you see cross before database, it probably means your Postgresql server is not working.
Image(filename='04_image/108.png')
#The correct status should be like this
Image(filename='04_image/109.png')
#the defualt way to access Postgresql is using command line for sql language and web application can also used as data management
Image(filename='04_image/110.png')
#after click "SQL Shell" you will see a new commond line.
#IF you only connect local server, just click "ENTER"
#You should enter database name, for instance salesdb, to access specific database in your postgresql
#postgres is default super administrator and password is you set password while install
#when you see "salesdb=#" it means you success connect database
Image(filename='04_image/111.png')
#you can mangage user in this part
Image(filename='04_image/112.png')
# except using command line to connect postgresql,
# I recoomend to use Python package to connect and manipulate data
# There are seveal package can achive this purpose. For instance ipython-sql and psycopg2
# We only demstrate how to use ipython-sql. You need to use conda install ipython-sql and sqlalchemy
# After successful install, you can use following code to connect postgresql database
%load_ext sql
%sql postgresql://postgres:00wasabi00@127.0.0.1/salesdb
First command is used to load magic model. Second command is used to connect postgres. //user:password@ipaddress/databasename
#success screen
#start with "%%sql" and follow with normal sql query language
Image(filename='04_image/113.png')
Concretely, we can follow the offical guide line https://www.postgresql.org/download/linux/ubuntu/
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get -y install postgresql-10
#After install, make use you install correct version
#psgl -V
Image(filename='04_image/115.png')
#you can use "sudo -u postgres psql" log into database with super administrator
Image(filename='04_image/116.png')
#If you need to connect remotely, you need to config super administrator with new passwod
#ALTER USER postgres PASSWORD '00wasabi00';
Image(filename='04_image/117.png')
#you also need to configure file
#sudo nano /etc/postgresql/10/main/postgresql.conf
Image(filename='04_image/118.png')
%load_ext sql
%sql postgresql://postgres:00wasabi00@54.90.59.139/postgres
(psycopg2.OperationalError) FATAL: no pg_hba.conf entry for host "98.109.25.198", user "postgres", database "postgres", SSL on
FATAL: no pg_hba.conf entry for host "98.109.25.198", user "postgres", database "postgres", SSL off
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Connection info needed in SQLAlchemy format, example:
postgresql://username:password@hostname/dbname
or an existing connection: dict_keys([])
This Tutorial will introduce MongoDB, including install, connect and basic operation.
MongoDB is a noSQL database which we called document database. MongoDB don't use SQL table-column-attribute format, replacing with collection-key-value pair format. One MongoDB can contain different database like admin, local(defalut), 03_test_db, 04_geo_db etc. Each database can contain a lot of collections(tables), which each collection is isolated and contains a integrated JSON database file.
Traditionally, we can install MongoDb on:
We can connect MongoDB via various ways:
Here is catalogue of this tutrioal and each topic is coming from easy to hard.
[EASY][RECOMMEND][NORMAL][HARD]| Level | Server | Type | Connection Method |
|---|---|---|---|
| Easy | Windows | Local | Mongo Comparess + MongoShell + PyMongo |
| Recommend | Mongo Altas | Cloud Cluster | Mongo Comparess + MongoShell + PyMongo |
| Normal | AWS EC2 | Cloud Non-Cluster | Mongo Comparess + MongoShell + PyMongo |
| Hard | AWS DocumentDB | Cloud Cluster | MongoShell + PyMongo |
[EASY]¶Image(filename='04_image/01.png')
C:\Program Files\MongoDB\Server\4.2\bin and we should add this into environement path for easy usemongod.exe is the server application and can add --datapath "d:\data\db" to change default path from c:\data\dbmongo.exe is commend line UI of itself. After start mongod.exe (think it as server), you can use mongo.exe to access Mongo Databasemongod.cfg didn't work in Windows, you need to create C:\data\db directory mannually as default settingmongod first and this will start MongoDB service. Second run mongo to strat Mongo Shell to access your local databaseImage(filename='04_image/02.png')
#When you running "mongod" commend, this new output indicate that you success
Image(filename='04_image/48.png')
#When you running "mongo" commend, this new output indicate that you success
Image(filename='04_image/49.png')
#default config don't contain adminstrator user, we should add "dbadmin" user to "admin" database
Image(filename='04_image/50.png')
#After create account on Mongo Altras, you can choose free server for education propose
Image(filename='04_image/51.png')
#before we start using this cluster we need add user and create network access list
#In here we create "dbAdmin" user with username:mongoadmin, password:mongoadmin
Image(filename='04_image/52.png')
#For educate purpose, we add "allow all access 0.0.0.0/0" into our network access
Image(filename='04_image/53.png')
# after cluster ready, you can see this. You can see connection part in following session
# we can use several ways to access this cloud database
Image(filename='04_image/06.png')
Typically, we can install MongoDB on an new EC2 directly. But we will setup an entire MongoDB in next section, so we recoomend to use AWS CloudFormation to build an MongoDB non-cluster version
#Access AWS CloudForamtion service, it's a easy and quich setup platform
Image(filename='04_image/54.png')
#Search MongoDB
Image(filename='04_image/55.png')
#VPC is an private zone or area. If all equipments were assigned into same VPC, it can communicate without further configuration.
Image(filename='04_image/56.png')
#CloudFormation has provide a basic template, we don't need to change first step setting
Image(filename='04_image/57.png')
#We can choose different subset area for our cluster point.
#For security Group, I alreaday have an "allow all" setting. Same to key pair, I choose an old one
#This template can use as single point
Image(filename='04_image/58.png')
#set username and password as "mongodbadmin". According to price and your purpose, we choose general type m4.large, roughly $0.1/ hour
#Change volum size to EC2 size 8GB
Image(filename='04_image/59.png')
#Frankly, we should create use IAM role for this. For education purpose we can ommit this step.
Image(filename='04_image/60.png')
Image(filename='04_image/61.png')
Amazon use MongoDB as foundation to build his own DocumentDB with culster.
AmazonDocDBFullAccess AWS management policy#this is basic structure of our access proach,we use EC2 as our access terminal to visit DocumentDB Cluster
Image(filename='04_image/09.png')
#search AWS DocumentDB
Image(filename='04_image/07.png')
#make sure you choose the right server region and keep the same region all the time (I choose US North Virginal)
#click create buttion
Image(filename='04_image/08.png')
#a is culster name
#b is server type and choose the type you want to pay
#c is number of instance, for demenstration, we only use 1
Image(filename='04_image/04.png')
#Username is used to log into DocumentDB, so setup and save it
#Master username = mongoadmin
#Master password = mongodbadmin
Image(filename='04_image/10.png')
#You can keey other options as default and click create. It will take a few minuts
#We should change defualt security group policy in this time. Click cluster name
Image(filename='04_image/11.png')
#click default security group
Image(filename='04_image/12.png')
#click Inbound rules (red) and edit inboud rules (blue)
Image(filename='04_image/13.png')
#for education prupose, we allow all traffic network stream and change the source to anywhere
Image(filename='04_image/14.png')
remember to stop culster, it will cost money
#click Lanuch instance
Image(filename='04_image/15.png')
#for convenient demo, we choose Ubuntu Server 18.04
Image(filename='04_image/16.png')
#we can use free module because this is just connect terminal
#you can directly click Reive and Lanuch and you can also check VPC(we should make sure cluster and EC2 in the same VPCb)
Image(filename='04_image/17.png')
#Every instance should take care about secrutiy group. Once you find you can't access your AWS server, check security group policy first
#For instance, SSH allow putty and other terminal to control server. ICMP allow ping to check network
Image(filename='04_image/18.png')
#when you select launch at last step, it will remind you create key pair. In here, we demenstrate how to create a new key pair
#key pair is used to access EC2 and you must keep this key pair file into folder you rememeber.
#Don't lose this key pair or you can't access your EC2
Image(filename='04_image/19.png')
#Default download is *.pem. This can be used in Linux and MAC. If you want to use Putty in windows, you need to transfrom to *.ppk
Image(filename='04_image/20.png')
#After a few minutes, EC2 will ready and running. You should copy public DNC and public ip. This is info you need to access EC2
Image(filename='04_image/21.png')
#if you are windows, you can use Putty (Download Link:https://www.putty.org/)
#But first, you need to convert *.pem to *.ppk(priviate public key)
Image(filename='04_image/22.png')
#putty.exe is access terminal. Puttygen.exe is used for convert
#open Puttygen.exe and click load. Locate *.pem by choose ALL Files(*.*)
Image(filename='04_image/23.png')
#After success load *.pem. Click save private key and get *.ppk for Putty
Image(filename='04_image/24.png')
#Until now we can use putty access putty
#first, change host name for EC2. Because we use Ubuntu, we replace"root" with "Ubuntu".
#Second, Copy Publich DNS "ec2-52-72-13-133.compute-1.amazonaws.com" after "@"
Image(filename='04_image/25.png')
#Thrid Click SSH -> Auth -> Browse -> Choose EC2_0610.ppk
#Do not change key pair name. Local key pair name should be identical to AWS key name
Image(filename='04_image/26.png')
#Fourth, Click right side Category -> Session -> Enter NAME of this seccsion and save. Next time you can just click Saved Session
Image(filename='04_image/27.png')
#Finally double click Saved Sessions or click Open. Click Yes in Putty Securtiy Alert windows
Image(filename='04_image/28.png')
#If you got this window. Congratualtion. You success
Image(filename='04_image/29.png')
</ol>
#Follow the commend line sequence enter them in sequence
Image(filename='04_image/30.png')
#get back to cluster and check connection inforamtion
#last part of password should replace by your setting up password
Image(filename='04_image/31.png')
#If all previous steps you did correct, you will access AWS DocumentDB at this window
Image(filename='04_image/32.png')
From former chapter, we only demonstrate how to connect on Windows. In this chapter, i will introduct how to connect AWS EC2 on Mac or Linux. Microsoft have add a lot of Linux core in Windows 10. Actually we can use cmd like Linux terminal, so all my screenshot from windows laptop, but the logical will be identical.
#First access *.pem directory cd /d <full path>
Image(filename='04_image/35.png')
#Actually you can directly use ssh in windows CMD, but you may face this warning
#In Linux, this can be fix by chmod 400 *.pem. But in windows it's not easy to change authority
Image(filename='04_image/44.png')
#Individual permission is coming from upper floder. This original security configuraion
#First we should click "Disable inheritance". This procedure will delete all permission.
Image(filename='04_image/45.png')
#Theoretically, we should only provide current user permission for this *.pem file. So Follow this click
#1.Advanced 2.Add 3.Select a principal 4.Advanced 5.Object Types 6.Only choose Users 7.Find Now 8.Select login user name
#9.OK and we'll get image in No.2, which means we allow allow currently user get permission of this file
Image(filename='04_image/46.png')
#ssh -i EC2_0610.pem ubuntu@ec2-52-72-13-133.compute-1.amazonaws.com
#use this commond we can access EC2 without putty in Windows
Image(filename='04_image/47.png')
From previous chapters, we have konw how to install MongoDB on different platforms. We will start introduce how to connect them with different tools
Mongo Compass can be install when you install MongoDB. It's a GUI ternimal
#when you successful inital Mongo Atlas, you can get access info
Image(filename='04_image/62.png')
#Choose the right version and copy the connection string
Image(filename='04_image/63.png')
#remember to replace username and password
Image(filename='04_image/64.png')
Connect Mongo Atlas with MongoDB MongoShell
#we can use loacl mongo to connect this cluster
Image(filename='04_image/65.png')
#remember to replace <dbname> with any Database name, actually you can replace it with admin. and replace username and password
#--username + parameter, this MongoDB commend formation.
#This is my connection string: mongo "mongodb+srv://cluster-test-03-62ji5.mongodb.net/covid" --username <username>
Image(filename='04_image/66.png')
we start from import json into our cluster. This is basic commend string:
mongoimport --host atlas-4ypdkm-shard-0/cluster4-shard-00-00.62ji5.mongodb.net:27017,cluster4-shard-00-01.62ji5.mongodb.net:27017,cluster4-shard-00-02.62ji5.mongodb.net:27017 \ --ssl \ --username mongoadmin \ --password mongoadmin \ --authenticationDatabase admin \ --db covid \ --collection jan-one \ --type json \ --file D:\Downloads\covid\16119_webhose_2020_01_db21c91a1ab47385bb13773ed8238c31_0000001.json
--hostthis is shard cluster, so atlas-4ypdkm-shard-0 is the name of shard and the rest is names of three cluster point
--sslmenas this security connection--username --password daatabase useranme and password--db the name of database you want to create--collection the name of collection you want to create under database, preferred not number--type Json, CSV or something else--file file path of source data#Click Command Line Tools and get connect cluster info
Image(filename='04_image/67.png')
Image(filename='04_image/68.png')
#When you succcess you can see following result
Image(filename='04_image/69.png')
#If you familiar with SQL and relation-Database, you can use this table to compare
Image(filename='04_image/03.png')
Connect Mongo Atlas with PyMongo, we can use python to directly access remote database. Here is code.
import pymongo
import dns
import json
from pymongo import MongoClient
client_atlas = MongoClient("mongodb+srv://mongoadmin:mongoadmin@cluster4.62ji5.mongodb.net/admin?retryWrites=true&w=majority")
Image(filename='04_image/70.png')
client_atlas.list_database_names()
['streamdata', 'admin', 'local']
#assign samble_airbnb DB to db_1 (here is database)
db_1 = client_atlas.get_database('sample_airbnb')
#In sample_airbnb we only have one collection so we assign listingAndReviews collection to collection_1
collection_1 = db_1.listingsAndReviews
collection_1.count_documents({})
0
#bulid a list contain all document
list_1 = list(collection_1.find())
mongod (if you add mongo dictornary into enviroment path)mongoIn this part we contain two part: first is establish jupyter on EC2 and second is local pymongo basic
From previous chapter, we know that we can't access AWS DocumentDB from outside of VPC directly except ssh turnel. So we build a jupyter notebook on EC2 and manipulate DocumentDB by pymongo package
#we check TSL status of AWS DocumentDB Cluster, which means we should use rds-combined-ca-bundle.pem.to access our culster all the time
#You can download key pair in this link: https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem.
#TLS AWS Document: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/UsingWithRDS.SSL.html
Image(filename='04_image/36.png')
get Anaconda3 package
wget https://repo.anaconda.com/archive/Anaconda3-2020.02-Linux-x86_64.sh
Image(filename='04_image/33.png')
#anacondata running path is "/home/ubuntu/.bashrc"
Image(filename='04_image/34.png')
# in this step, the most important part is "modifed"
# this will decide the anaconda initialzation process
# if this step is not coorect, you can mannually assign
# sudo source <path of anaconda>/bin/activate and then: conda init
# sudo source /home/unbuntu/anaconda3/bin activate -> conda init
conda activate
ipython
#set password as admin and record Hash string
Image(filename='04_image/37.png')
'sha1:2b4e5095a913:2ff43c1fc73822d6c98dfa86cdef3b26fcbe2f3c'
openssl req -x509 -nodes -days 365 -newkey rsa:1024 -keyout mycert.pem -out mycert.pem'sha1:6741a01d5f68:c4f99e8a386c093b8f3123099f8b9a860dfa1fa5'
jupyter notebook --generate-config mkdir certscd certsopenssl req -x509 -nodes -days 365 -newkey rsa:1024 -keyout mycert.pem -out mycert.pemImage(filename='04_image/38.png')
cd ~/.jupyter/vi jupyter_notebook_config.py
#When i press "ENTER" it will get into vim windows
Image(filename='04_image/39.png')
"""
c = get_config()
# Kernel config
c.IPKernelApp.pylab = 'inline' # if you want plotting support always in your notebook
# Notebook config
c.NotebookApp.certfile = u'/home/ubuntu/jupyter/certs/mycert.pem' #location of your certificate file
c.NotebookApp.ip = '0.0.0.0'
c.NotebookApp.open_browser = False #so that the ipython notebook does not opens up a browser by default
c.NotebookApp.password = u'sha1:2b4e5095a913:2ff43c1fc73822d6c98dfa86cdef3b26fcbe2f3c' #the encrypted password we generated above
# Set the port to 8888, the port we set up in the AWS EC2 set-up
c.NotebookApp.port = 8888
"""
#Insert this at the beginning of the document:\
#replace c.NotebookApp.password with your own Hash string
"""
c = get_config()
# Kernel config
c.IPKernelApp.pylab = 'inline' # if you want plotting support always in your notebook
# Notebook config
c.NotebookApp.certfile = u'/home/ubuntu/certs/mycert.pem' #location of your certificate file
c.NotebookApp.ip = '0.0.0.0'
c.NotebookApp.open_browser = False #so that the ipython notebook does not opens up a browser by default
c.NotebookApp.password = u'sha1:6741a01d5f68:c4f99e8a386c093b8f3123099f8b9a860dfa1fa5' #the encrypted password we generated above
# Set the port to 8888, the port we set up in the AWS EC2 set-up
c.NotebookApp.port = 8888
"""
"\nc = get_config()\n \n# Kernel config\nc.IPKernelApp.pylab = 'inline' # if you want plotting support always in your notebook\n\n# Notebook config\nc.NotebookApp.certfile = u'/home/ubuntu/certs/mycert.pem' #location of your certificate file\nc.NotebookApp.ip = '0.0.0.0'\nc.NotebookApp.open_browser = False #so that the ipython notebook does not opens up a browser by default\nc.NotebookApp.password = u'sha1:6741a01d5f68:c4f99e8a386c093b8f3123099f8b9a860dfa1fa5' #the encrypted password we generated above\n# Set the port to 8888, the port we set up in the AWS EC2 set-up\nc.NotebookApp.port = 8888\n"
# press "i" into edit pattern
# use "SHIFT+INSERT" to paste commend
# press "ESC" after edit and enter ":wq" to save and quit
Image(filename='04_image/40.png')
Create folder for notebooks
cd ~mkdir Notebookscd NotebooksCreate new screen This command allows you to create a separate screen for just your Jupyter process logs while you continue to do other work on the ec2 instance.
screen
sudo chown $USER:$USER /home/ubuntu/certs/mycert.pem
jupyter notebook
screen
sudo chown $USER:$USER /home/ubuntu/certs/mycert.pemjupyter notebook
Image(filename='04_image/41.png')
https://ec2-52-72-13-133.compute-1.amazonaws.com:8888 password=admin
!conda install -c anaconda pymongo --yes!conda install --yes -c anaconda dnspython!conda install --yes -c jmcmurray jsonimport pymongoimport dnsimport jsonf### 5.7.1 Jupyter Notebook on EC2
Click this Link back to Topom pymongo import MongoClientclient = pymongo.MongoClient('mongodb://mongoadmin:mongoadmin@docdb-2020-06-11-03-01-07.cluster-crp0v4bay2br.us-east-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred') client.list_database_names()#Download *.pem in Notebook folder:wget https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem
#check endpoint
Image(filename='04_image/42.png')
Image(filename='04_image/43.png')
In ths part we introduce basic commond of Pymongo
import pymongo
import dns
import json
from pymongo import MongoClient
client_aws = MongoClient("mongodb://mongoadmin:mongoadmin@docdb-2020-06-11-03-01-07.cluster-crp0v4bay2br.us-east-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false")
client = pymongo.MongoClient('mongodb://mongoadmin:mongoadmin@docdb-2020-06-11-03-01-07.cluster-crp0v4bay2br.us-east-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred')
client.list_database_names()
#If
#client = MongoClient()
#client_local = MongoClient("localhost",27017)
#client_local = MongoClient('mongodb://localhost:27017/')
#This is MongoAltas cloud database
client_cluster = MongoClient("mongodb://admin:<admin>@cluster-test-03-62ji5.mongodb.net/test:27017")
#client_cluster = pymongo.MongoClient("mongodb://admin:admin@cluster-test-03-62ji5.mongodb.net/test?retryWrites=true&w=majority")
#db = client.test
client_cluster = MongoClient("mongodb://admin:admin@cluster-test-03-62ji5.mongodb.net/test:27017") client_cluster.list_database_names()
client_local = MongoClient('mongodb://localhost:27017/')
db = client_local['covid_db']
collection_1 = db['covid']
with open(r'C:\data\db\03_april28-may28.json') as f:
file_data = json.load(f)
# if pymongo < 3.0, use insert()
#collection_currency.insert(file_data)
# if pymongo >= 3.0 use insert_one() for inserting one document
collection_1.insert_one(file_data)
# if pymongo >= 3.0 use insert_many() for inserting many documents
#collection_currency.insert_many(file_data)
limited_result = collection_1.find().limit(5)
for _ in limited_result:
print(_)
IOPub data rate exceeded. The notebook server will temporarily stop sending output to the client in order to avoid crashing it. To change this limit, set the config variable `--NotebookApp.iopub_data_rate_limit`. Current values: NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec) NotebookApp.rate_limit_window=3.0 (secs)
limited_result = collection1.find().limit(1) for in limitedresult: print()
db.list_collection_names()
['covid']
client_cluster.list_database_names()
client_local.list_database_names()
['01_testdb', '03_twitter_geo_db', 'admin', 'config', 'local']
MongoDB use JSON document to restore data. In PyMongo we use dictionaries to represent documents
article_2 = {"author" : "Derrick Muiti",
"about" : "Introduction to MongoDB and Python",
"tags":["mongodb","python","pymongo"]}
To insert a document into a collection we use insert_one() method
articles = db.articles
result = articles.insert_one(article)
When the document is insert,a special key_id is generated and its unique to this document.
#When the document is insert,a special key<code>_id</code> is generated and its unique to this document.
print("first artical key is :{}",format(result.inserted_id))
first artical key is :{} 5ecee6a084a8b58efad65267
#check the database in our connection (client_Atlas)
client_Atlas.list_database_names()
['sample_airbnb', 'sample_analytics', 'sample_geospatial', 'sample_mflix', 'sample_restaurants', 'sample_supplies', 'sample_training', 'sample_weatherdata', 'admin', 'local']
#create a test database (two method). If we don't insert new document into database, it will not actually cerate new database
db_2 = client_Atlas['testdb_02']
#db_2 = client_Atlas.testdb_02
#create a dict for insert
article_2 = {"author" : "Derrick Muiti",
"about" : "Introduction to MongoDB and Python",
"tags":["mongodb","python","pymongo"]}
#create a collection, next level of database, first check whether this databse contain other collection
db_2.list_collection_names()
[]
#collection(table) is componented of document(row) and store in database
collection_02 = db_2.collection_02
#collection = db_2['test_collection']
db_2.list_collection_names()
[]
post_id = collection_02.insert_one(article_2)
post_id
<pymongo.results.InsertOneResult at 0x1c419525bc8>
#When the document is insert,a special key<code>_id</code> is generated and its unique to this document.
print("first artical key is :{}",format(post_id.inserted_id))
first artical key is :{} 5ed98bd3b0e4334cf9fc7309
db_2.list_collection_names()
['collection_02']
The articales collection is created after inserting the first document. We can confirm this using the list_collection_namesmethod
db_1.list_collection_names()
['listingsAndReviews']
We can insert multiple documents to a collection using the insert_many() method as shown below:
article_1 = {"author": "Emanuel Kens",
"about" : "Ken and Python",
"tags":["ken","pymongo"]}
article_2 = {"author" : "Daniel Kimeli",
"about" : "Web Development and Python",
"tags" : ["web", "design", "HTML"]}
new_articles = collection_02.insert_many([article_1,article_2])
print("the new article IDs are {}".format(new_articles.inserted_ids))
the new article IDs are [ObjectId('5ed9b16ab0e4334cf9fc730a'), ObjectId('5ed9b16ab0e4334cf9fc730b')]
find_one() returns a single document matching the query or none if it doesn't exist. This method returns the first match that it comes across. When we call the method below, we get the first article we inserted into our collection.
print(collection_02.find_one())
{'_id': ObjectId('5ed98bd3b0e4334cf9fc7309'), 'author': 'Derrick Muiti', 'about': 'Introduction to MongoDB and Python', 'tags': ['mongodb', 'python', 'pymongo']}
for article in articles.find():
print(article)
{'_id': ObjectId('5ecee6a084a8b58efad65267'), 'author': 'Derrick Muiti', 'about': 'Introduction to MongoDB and Python', 'tags': ['mongodb', 'python', 'pymongo']}
{'_id': ObjectId('5eceedf684a8b58efad65268'), 'author': 'Emanuel Kens', 'about': 'Ken and Python', 'tags': ['ken', 'pymongo']}
{'_id': ObjectId('5eceedf684a8b58efad65269'), 'author': 'Daniel Kimeli', 'about': 'Web Development and Python', 'tags': ['web', 'design', 'HTML']}
#when building web applications, we usually get document IDs from the URL and try to retrieve them from our MongoDB colleciton.
#In order to achieve this, we first have to convert the obtained string ID into an <code> ObjectId</code>
from bson.objectid import ObjectId
def get(post_id):
document = client.db.collection.find_one({'_id': ObjectId(post_id)})
#MongoDB use a different way to fetch field(columns), 0 means not fecht, 1 means fetch
for doc in collection_02.find({},{"_id":0, "author":1, "about":1}):
print(doc)
{'author': 'Derrick Muiti', 'about': 'Introduction to MongoDB and Python'}
{'author': 'Emanuel Kens', 'about': 'Ken and Python'}
{'author': 'Daniel Kimeli', 'about': 'Web Development and Python'}
We can use the sort() method to sort the results in ascending or descending order. The default order is ascending. We use 1 to signify ascending and -1 to signify descending
doc = articles.find().sort("author", -1)
for x in doc:
print(x)
{'_id': ObjectId('5eceedf684a8b58efad65268'), 'author': 'Emanuel Kens', 'about': 'Ken and Python', 'tags': ['ken', 'pymongo']}
{'_id': ObjectId('5ecee6a084a8b58efad65267'), 'author': 'Derrick Muiti', 'about': 'Introduction to MongoDB and Python', 'tags': ['mongodb', 'python', 'pymongo']}
{'_id': ObjectId('5eceedf684a8b58efad65269'), 'author': 'Daniel Kimeli', 'about': 'Web Development and Python', 'tags': ['web', 'design', 'HTML']}
we update a document using the update_one() method. The first paratmeter taken by this function is a query object defining the document to be updated. If the method finds more than one document. it will only update the first one. Let's update the name of the author in the article written by Derrick.
query = {"author":"Derrick Muiti"}
new_author = {"$set": {"author":"John David"}}
collection_02.update_one(query, new_author)
<pymongo.results.UpdateResult at 0x1c41952ac48>
for doc in collection_02.find():
print(doc)
{'_id': ObjectId('5ed98bd3b0e4334cf9fc7309'), 'author': 'John David', 'about': 'Introduction to MongoDB and Python', 'tags': ['mongodb', 'python', 'pymongo']}
{'_id': ObjectId('5ed9b16ab0e4334cf9fc730a'), 'author': 'Emanuel Kens', 'about': 'Ken and Python', 'tags': ['ken', 'pymongo']}
{'_id': ObjectId('5ed9b16ab0e4334cf9fc730b'), 'author': 'Daniel Kimeli', 'about': 'Web Development and Python', 'tags': ['web', 'design', 'HTML']}
MongoDB enables us to limit the result of our query using the limit method. In our query below we'll limit the result to one record.
limited_result = collection_02.find().limit(2)
for _ in limited_result:
print(_)
{'_id': ObjectId('5ed98bd3b0e4334cf9fc7309'), 'author': 'John David', 'about': 'Introduction to MongoDB and Python', 'tags': ['mongodb', 'python', 'pymongo']}
{'_id': ObjectId('5ed9b16ab0e4334cf9fc730a'), 'author': 'Emanuel Kens', 'about': 'Ken and Python', 'tags': ['ken', 'pymongo']}
client = MongoClient('localhost', 27017)
#create a database named as "03_twitter_geo_db"
db = client['03_twitter_geo_db']
#in this database, creaate a colllection(table) named "twitter_geo"
twitter_geo = db['twitter_geo']
with open(r'C:\data\db\03_april28-may28.json') as f:
file_data = json.load(f)
twitter_geo.insert(file_data)
D:\ProgramData\Anaconda3\envs\03_ten1131_keras231\lib\site-packages\ipykernel_launcher.py:1: DeprecationWarning: insert is deprecated. Use insert_one or insert_many instead. """Entry point for launching an IPython kernel.
ObjectId('5ecfbe5a84a8b58efad6526e')
client.list_database_names()
['01_testdb', '03_twitter_geo_db', 'admin', 'config', 'local']
# if pymongo < 3.0, use insert()
collection_currency.insert(file_data)
# if pymongo >= 3.0 use insert_one() for inserting one document
collection_currency.insert_one(file_data)
# if pymongo >= 3.0 use insert_many() for inserting many documents
collection_currency.insert_many(file_data)
db = client_local['covid_db']
collection_1 = db['covid']
with open(r'C:\data\db\03_april28-may28.json') as f:
file_data = json.load(f)
# if pymongo < 3.0, use insert()
#collection_currency.insert(file_data)
# if pymongo >= 3.0 use insert_one() for inserting one document
collection_1.insert_one(file_data)
# if pymongo >= 3.0 use insert_many() for inserting many documents
#collection_currency.insert_many(file_data)
limited_result = collection_1.find().limit(5)
for _ in limited_result:
print(_)
limited_result = collection_1.find().limit(1)
for _ in limited_result:
print(_)
db.list_collection_names()
client_cluster.list_database_names()
client_local.list_database_names()
user:mongoadmin
password: mongoadmin
crate time 06102020
EC2 Public Key: ubuntu@ec2-52-72-13-133.compute-1.amazonaws.com
Putty: putty -ssh -i EC2_0610.ppk ubuntu@ec2-52-72-13-133.compute-1.amazonaws.com
Copy from local to EC2: scp -i EC2_0610.pem D:\Downloads\covid\16119_webhose_2020_01_db21c91a1ab47385bb13773ed8238c31_0000001.json ubuntu@ec2-52-72-13-133.compute-1.amazonaws.com:/home/ubuntu/Notebooks
Jupyter notebook:https://ec2-52-72-13-133.compute-1.amazonaws.com:8888/
Jupyter Password:admin
MongoDB Username:mongoadmin MongoDB Password:mongoadmin
IMPORT JSON INOT DocumentDB mongoimport --ssl \ --host docdb-2020-06-11-03-01-07.cluster-crp0v4bay2br.us-east-1.docdb.amazonaws.com:27017 \ --sslCAFile rds-combined-ca-bundle.pem \ --username=mongoadmin \ --password=mongoadmin \ --collection=col-data \ --db=covid \ --file=16119_webhose_2020_01_db21c91a1ab47385bb13773ed8238c31_0000001.json \ --numInsertionWorkers 4 \
mongoimport --uri "mongodb://mongoadmin:mongoadmin@docdb-2020-06-11-03-01-07.cluster-crp0v4bay2br.us-east-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-bundle.pem&replicaSet=myAtlasRS&authSource=admin" -d covid -c coldata -file 16119_webhose_2020_01_db21c91a1ab47385bb13773ed8238c31_0000001.json --type json --ssl
mongoimport --host docdb-2020-06-11-03-01-07/docdb-2020-06-11-03-01-07.cluster-crp0v4bay2br.us-east-1.docdb.amazonaws.com:27017 --ssl --sslCAFile rds-combined-ca-bundle.pem --username mongoadmin --password mongoadmin --db covid --collection 20200101 --type json --file 16119_webhose_2020_01_db21c91a1ab47385bb13773ed8238c31_0000001.json
mongo --ssl \ --host docdb-2020-06-11-03-01-07.cluster-crp0v4bay2br.us-east-1.docdb.amazonaws.com:27017 \ --sslCAFile rds-combined-ca-bundle.pem \ --username mongoadmin \ --password mongoadmin
***ATLAS**
mongo "mongodb+srv://cluster-test-03-62ji5.mongodb.net/sample_airbnb" --username admin --password admin
mongoimport --host Cluster-test-03-shard-0/cluster-test-03-shard-00-00-62ji5.mongodb.net:27017,cluster-test-03-shard-00-01-62ji5.mongodb.net:27017,cluster-test-03-shard-00-02-62ji5.mongodb.net:27017 --ssl --username admin --password admin --authenticationDatabase admin --db covid --collection 20200101 --type json --file 16119_webhose_2020_01_db21c91a1ab47385bb13773ed8238c31_0000001.json
This tutorial will introduce how to install Apache Kafka, Cassandra and Kinesis. Here is reference list:
Cassandra is an open source NoSQL database. In case we face compatibility problem, we recommend you use Ubuntu 20.04 as our operation system. We built a one cluster Cassandra database first and then configure it into two cluster. We use AWS to demenstrate
If you need more specifiy detail, please check offical document: https://cassandra.apache.org
IMPORTANT! Due to defalut setting, OS must have at least 8GB memory
Image(filename='04_image/114.png')
#launch a Ubuntu 20.04 with 8 GB memory
Image(filename='04_image/71.png')
# according to current price, you can choose any type with 8 GB
# due to memory requirement, we can't use free EC2. Please make sure payment.
Image(filename='04_image/72.png')
#About AWS launch and confiugration, you can check other tutorial, we don't talk too much in here
Image(filename='04_image/73.png')
# Cassandra is based on JAVA, so we have to make sure you install right version JAVA. Check java version.
# If you initial a new server, maybe your java is not install
Image(filename='04_image/74.png')
#we have two type of java: Oracle Java Standard Edition 8 or OpenJDK 8. We choose JDK version here
#Here is OpenJDK offical site: http://openjdk.java.net/
#If you face this error, try to sudo apt-get update first
Image(filename='04_image/75.png')
make sure install jave environment sudo apt-get install openjdk-8-jre-headless
#after update, you can install openjdk-8-jre
Image(filename='04_image/76.png')
#it will take a few minutes to update openjdk. After all done, you can check java --version again
#please remember the install path in the red box. /usr/lib/jvm/java-8-openjdk-amd64. We will use this make enviroment variable
Image(filename='04_image/77.png')
make sure you java install pathpwd</li>
/usr/lib/jvm/java-8-openjdk-amd64#If you follow above instruction, you should get this output information
Image(filename='04_image/78.png')
configue envrionment variable to make sure Cassandra can find correct java versionsudo nano ~/.bashrc</li>
#configue envrionment variable to make sure Cassandra can find correct java version
Image(filename='04_image/79.png')
#scoll down to the botton and add export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
#CTRL+S save and CTRL+X exit
Image(filename='04_image/80.png')
#restart bash and verify envirnment. If you get this result, your enviroment variable
#exec bash
#echo $JAVA_HOME
Image(filename='04_image/81.png')
#Typically, install binary version is most simple, but that way can't configure service,so i recoomend Debian Package
#sudo apt install curl
Image(filename='04_image/82.png')
#Add the Apache repository of Cassandra to the file cassandra.sources.list. 4.0 is beta version ,we use 3.11
#echo "deb http://downloads.apache.org/cassandra/debian 311x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
#Should get output like this:deb http://downloads.apache.org/cassandra/debian 311x main
Image(filename='04_image/83.png')
#Add the Apache Cassandra repository keys to the list of trusted keys on the server:
#curl https://downloads.apache.org/cassandra/KEYS | sudo apt-key add -
Image(filename='04_image/84.png')
#Update the package index from sources:
#sudo apt-get update
Image(filename='04_image/85.png')
#Install Cassandra with APT:
#sudo apt-get install cassandra
Image(filename='04_image/86.png')
#The Cassandra service gets started automatically after installation. Monitor the progress of the startup with:
#tail -f /var/log/cassandra/system.log
#sudo service cassandra status
Image(filename='04_image/87.png')
#Check the status of Cassandra:The status column in the output should report UN which stands for “Up/Normal”.
#nodetool status
Image(filename='04_image/88.png')
#Alternatively, connect to the database with:
#cqlsh
Image(filename='04_image/89.png')
When you facing issues about cannot start Cassandra correctly, i recommend you use log to trouble-shooting. Default log restore path is /var/log/cassandra/, and you can use grep 'WARN\|ERROR' /var/log/cassandra/system.log | tail to filter JAVA error about WARNING AND ERRORS
Image(filename='04_image/90.png')
Default install path of Cassandra is /etc/cassandra and most of configuration of Cassandra is writing in /etc/cassandra/config.yaml</cdoe>
TBD
Cassandra-driver can't create space, you can use cqlsh to create space
TBD
some basic steps
sudo apt-get install openjdk-8-jre-headlesspwd/usr/lib/jvm/java-8-openjdk-amd64sudo nano ~/.bashrcexport JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64exec bashecho $JAVA_HOMEsudo apt install curlecho "deb http://downloads.apache.org/cassandra/debian 40x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.listKafka offical website only provide binary install way. We recommend to setup Kafka service. Here is reference link: https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-18-04
#Get to /home/ubuntu and download kafka binaries
#wget http://mirror.cc.columbia.edu/pub/software/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
Image(filename='04_image/91.png')
#Extract the archive you downloaded using the tar command:
#tar -xzf kafka_2.12-2.5.0.tgz
Image(filename='04_image/92.png')
Kafka’s default behavior will not allow us to delete a topic, the category, group, or feed name to which messages can be published. To modify this, let’s edit the configuration file.Kafka’s configuration options are specified in server.properties. Open this file with nano or your favorite editor:
#nano config/server.properties
Image(filename='04_image/93.png')
#Let’s add a setting that will allow us to delete Kafka topics. Add the following to the bottom of the file:
#delete.topic.enable = true
#CTRL+S to save and CTRL+X to quti
Image(filename='04_image/94.png')
In this section, we will create systemd unit files for the Kafka service. This will help us perform common service actions such as starting, stopping, and restarting Kafka in a manner consistent with other Linux services. Zookeeper is a service that Kafka uses to manage its cluster state and configurations. It is commonly used in many distributed systems as an integral component. If you would like to know more about it, visit the official Zookeeper docs.
# Create the unit file for zookeeper:
# sudo nano /etc/systemd/system/zookeeper.service
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target
[Service] Type=simple ExecStart=/home/ubuntu/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh /home/ubuntu/kafka_2.12-2.5.0/config/zookeeper.properties ExecStop=/home/ubuntu/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh Restart=on-abnormal
[Install] WantedBy=multi-user.target
Image(filename='04_image/95.png')
# sudo system start zookeeper
The [Unit] section specifies that Zookeeper requires networking and the filesystem to be ready before it can start.
The [Service] section specifies that systemd should use the zookeeper-server-start.sh and zookeeper-server-stop.sh shell files for starting and stopping the service. It also specifies that Zookeeper should be restarted automatically if it exits abnormally.
sudo nano /etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
ExecStart=/bin/sh -c '/home/ubuntu/kafka_2.12-2.5.0/bin/kafka-server-start.sh /home/ubuntu/kafka_2.12-2.5.0/config/server.properties > /home/ubuntu/kafka_2.12-2.5.0/kafka.log 2>&1'
ExecStop=/home/ubuntu/kafka_2.12-2.5.0/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
The [Unit] section specifies that this unit file depends on zookeeper.service. This will ensure that zookeeper gets started automatically when the kafka service starts.
The [Service] section specifies that systemd should use the kafka-server-start.sh and kafka-server-stop.sh shell files for starting and stopping the service. It also specifies that Kafka should be restarted automatically if it exits abnormally.
Image(filename='04_image/96.png')
#Now that the units have been defined, start Kafka with the following command:
#sudo systemctl start kafka
#To ensure that the server has started successfully, check the journal logs for the kafka unit:
#sudo journalctl -u kafka
#While we have started the kafka service, if we were to reboot our server, it would not be started automatically.
#sudo systemctl enable kafka
Image(filename='04_image/97.png')
We use pyhon as our Glue code, which will be used to web mining,simulate consumer and simulate producer. If you use jupyter notebook you can install kafka python by conda install -c conda-forge kafka-python
Then you can import kafka in jupyter notebook:
from kafka import KafkaProducer
from kafka.errors import KafkaError
from kafka import KafkaConsumer
Typically, we can use kafka command line to send messenge as a producer:(REMEMBER to chagne topic03 as your own topic)
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic03
Image(filename='04_image/99.png')
In the meantime, you can see topic info by comuser command line:(Change topic03 to your topic name)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic03 --from-beginning
Image(filename='04_image/100.png')
So far we have been running against a single broker, but that's no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let's expand our cluster to three nodes (still all on our local machine).
TBD
# Go to Amazon Kinesis Services page
# Create a delivery stream
Image(filename='Picture1.jpg')
Step1: Name and sourse
#Enter a delivery stream name
Image(filename='Picture2.jpg')
#Choose the first option to send records directly to the delivery stream, or to send records from AWS IoT, CloudWatch Logs, or CloudWatch Events
#In this example, we send records directly to the delivery stream
Image(filename='Picture3.jpg')
Step 2: Process records
#In this example we choose disable option
Image(filename='Picture4.jpg')
Step 3: Choose a destination
# In this example, we store all the data into Amazon S3
Image(filename='Picture5.jpg')
# Select a S3 bucket or use "create new" to create a new bucket to store your data
Image(filename='Picture6-1.jpg')
Step 4: Configure settings
#Kinesis firehose is near real time, the minimum latency is 60 seconds
#In this example, buffer flushes when it reach 5 MB or 60 seconds. You can set it according to your data
Image(filename='Picture7.jpg')
# create a IAM role
Image(filename='Picture8.jpg')
Step 5: Review
Image(filename='Picture9.jpg')
#Click launch instance
Image(filename='Picture10.jpg')
Step 1: Choose an Amazon Machine Image (AMI)
#Select Amazon Linux AMI 2018.03.0 (HVM), SSD Volume Type - ami-09d8b5222f2b93bf0
Image(filename='Picture11.jpg')
Step 2: Choose an Instance Type
#Select t2.micro (Variable ECUs, 1 vCPUs, 2.5 GHz, Intel Xeon Family, 1 GiB memory, EBS only)
Image(filename='Picture12.png')
# After Download Key Pair, you can launch instances
Image(filename='Picture13.png')
#Click Connect to see how to connect to the instance
Image(filename='Picture14.png')
Step 1: convert key pair If you don't konw how to do this, lick here
#convert key pair from *.gem to *.ppk
Step 2: connect to the instance on PuTTY Configuration If you don't konw how to do this, lick here
# Enter ec2-user to start the instance
Image(filename='Picture21.png')
# Install Kinesis Client Application
# Enter: "sudo yum install -y aws-kinesis-agent"
Image(filename='Picture22.png')
Upload local data
Image(filename='Picture23.png')
Upload online data
# Change mode of LogGenerator.py
# Enter"chmod a+x LogGenerator.py"
[ec2-user@ip-172-31-37-11 ~]$ chmod a+x LogGenerator.py
# Create log directory
# Enter"sudo mkdir /var/log/cadabra"
[ec2-user@ip-172-31-37-11 ~]$ sudo mkdir /var/log/cadabra
# Go to aws-kinesis folder
# cd /etc/aws-kinesis/
[ec2-user@ip-172-31-37-11 ~]$ cd /etc/aws-kinesis/
# Open the editor of configuration file
# sudo nano agent.json
[ec2-user@ip-172-31-37-11 aws-kinesis]$ sudo nano agent.json
# Edit the agent.json as below
# Ctrl^O to write out
# Ctrl^X to exit
Image(filename='Picture24.png')
start up the agent
# Start the agent
# Enter "sudo service aws-kinesis-agent start"
[ec2-user@ip-172-31-37-11 aws-kinesis]$ sudo service aws-kinesis-agent start
# Make the agent start up automatically when we start our instance
# Enter"sudo chkconfig aws-kinesis-agent on"
[ec2-user@ip-172-31-37-11 aws-kinesis]$ sudo chkconfig aws-kinesis-agent on
#Put data into S3
# In this case we put 5000 data from source csv file into S3
[ec2-user@ip-172-31-37-11 aws-kinesis]$ cd ~
[ec2-user@ip-172-31-37-11 ~]$ sudo ./LogGenerator.py 5000
#View the processing
[ec2-user@ip-172-31-37-11 ~]$ cd /var/log/cadabra/
[ec2-user@ip-172-31-37-11 cadabra]$ ls
20200715-220217.log
[ec2-user@ip-172-31-37-11 cadabra]$ tail –f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
#Now, you can find the data in your S3 bucket
#We use a simple pipeline to construction Kinesis Firehouse streaming
Image(filename='04_image/120.png')
#make sure OS version is correct
#cat /etc/os-release
Image(filename='04_image/119.png')
# you can check other part of tutorial about how to establish jupyter
#we use jupyter notebook to run a web minning program
#If we want kinesis-agent to read file and transfor, we need to add '\n' between lines
#kinesis-agent is monitoring /var/log/cadabra/*.log
#!/home/ec2-user/anaconda3/bin/python
# -*- coding: utf-8 -*-
import csv
import time
import sys
import json
import re
import requests
if __name__=='__main__':
count =0
while True:
#get json tpye file from citibike resource
r = requests.get('https://gbfs.citibikenyc.com/gbfs/en/station_status.json')
#store it in data
#soup = BeautifulSoup(text, 'html.parser')
test = "data.log"
dest_data = time.strftime("/var/log/cadabra/%Y%m%d-%H%M%S.log")
data_3 = re.sub(r'(?<=[.,])(?=[^\s])', r'\n', r.text)
with open(dest_data,'w') as f:
n = f.write(data_3)
f.close()
count+=1
time.sleep(3600)
class JSONObject:
def __init__(self, d):
self.__dict__ = d
#data = json.loads(r.content, object_hook=JSONObject)
if __name__=='__main__':
count =0
while count<3:
#get json tpye file from citibike resource
r = requests.get('https://gbfs.citibikenyc.com/gbfs/en/station_status.json')
#store it in data
soup = BeautifulSoup(text, 'html.parser')
test = "data.log"
dest_data = time.strftime("/var/log/cadabra/%Y%m%d-%H%M%S.log")
data_3 = re.sub(r'(?<=[.,])(?=[^\s])', r'\n', r.text)
with open(test,'w') as f:
n = f.write(data_3)
f.close()
count+=1
time.sleep(2)
#data_3 = re.sub(r'(?<=[.,])(?=[^\s])', r'\n', r.text)
#data_4 = pd.read_csv(data_3, sep="\n")
#resp = json.loads(r.content.decode('utf-8'))
#cd /etc/aws-kinesis/
#nano agent.json
Image(filename='04_image/121.png')
#after we create file, you can find data generated through kinesis firehouse to S3 storage
Summary
/home/ec2-user/chmod a+x LogGenerage.py This is code scrach data from source and restore into target floder for kinesis agent to readcd /var/log/cadabra/ This is kinesis agent read folder, any files .log will be read by kinesis agent/etc/aws-kinesis/agent.json Thiis is agent itself program.sudo service aws-kinesis-agent status sudo chkconfig aws-kinesis-agent ontail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.logclient_atlas = MongoClient("mongodb+srv://mongoadmin:mongoadmin@cluster4.62ji5.mongodb.net/admin?retryWrites=true&w=majority") Image(filename='04_image/70.png')
client_atlas.list_database_names()
db_1 = client_Atlas.get_database('sample_airbnb')
collection_1 = db_1.listingsAndReviews
collection_1.count_documents({})
list_1 = list(collection_1.find())
| Sequence | Tools | Process | |
|---|---|---|---|
| Step1 | Python | Use Requests get Json raw data | |
| Step2 | Python | Transfor raw data into binary data | |
| Step3 | Kafka-python | Send binary data as Kafka producer | |
| Step4 | Kafka-python | Receive binary data as Kafka consumer | |
| Step5 | Cassandra-driver | Read topic data from Kafka | |
| Step6 | Cassandra-driver | Write data into Cassandra database |
Acrooding Kafka mechanism, every infomation send by consumer will be categoried into one topic. We can use this code to check exist topic.
Output will be group name, which will used in consumer setting
from kafka import BrokerConnection
from kafka.protocol.admin import *
import socket
#connect to local Broker
bc = BrokerConnection('localhost', 9092, socket.AF_INET)
bc.connect_blocking()
#print all topic and group name currently
list_groups_request = ListGroupsRequest_v1()
future = bc.send(list_groups_request)
while not future.is_done:
for resp, f in bc.recv():
f.success(resp)
for group in future.value.groups:
print(group)
('my-group', 'consumer')
('group0001', 'consumer')
('console-consumer-90135', 'consumer')
Lets try to scrapy data from CitiBike system. Here is link:https://gbfs.citibikenyc.com/gbfs/en/station_status.json
import socket
import json
import requests
import urllib3
import re
from kafka import KafkaProducer
from kafka.errors import KafkaError
from kafka import KafkaConsumer
from bs4 import BeautifulSoup
#use for unix time stamp conversion
from datetime import datetime
#transform time zone
from pytz import timezone
#use requests to get raw html result and restore it into r_01
r_01 = requests.get('https://gbfs.citibikenyc.com/gbfs/en/station_status.json')
#But it's not typical html web, we should use json package to parse raw html and convert string to readable string
def json_transfor(jsonstring):
msg = json.loads(jsonstring)
return msg
#r_01.text will be Unicode form and get json string by json_transofr fucntion. data_02 will be dictornary
data_02 = json_transfor(r_01.text)
#use regex to get very specific data we want
#this regex extract start with 'station_id' and with two and more number \d{2,}
data_id = re.findall(r"\'station_id\': \'\d{2,}\'", str(data_02))
#this regex extract time stamp of each recodes
data_time = re.findall(r"\'last_reported\': \d+", str(data_02))
#if we get the correct data, we should have same len records
if len(data_id)==len(data_time):
print("we have {} records".format(len(data_id)))
else:
print("Error, len of data_id and data_time is different")
#only extract value in stataion_ID
data_ID = re.findall("\d{2,}", str(data_id))
#only extract time value in last_reports
data_timestamp = re.findall("\d+", str(data_time))
#create a new list to restore standard timestamp fomation
data_daytime=[]
#timezone is not working well for now
eastern = timezone('US/Eastern')
for n in range(0,len(data_timestamp)):
data_int = int(data_timestamp[n])
data_utc = datetime.utcfromtimestamp(data_int)
#change from UTC to ETC
data_daytime.append(eastern.localize(data_utc).strftime('%Y-%m-%d %H:%M:%S'))
#data_daytime = datetime.fromtimestamp(timestamp)
#print(data_daytime)
#this is very simple version, you can change version and put it into a loop to output all time stamp
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
producer.send('topic03', data_daytime[0].encode('utf-8'))
we have 1037 records
<kafka.producer.future.FutureRecordMetadata at 0x7f5d1c3e0290>
When you runing above code, you did following task:
You can use this command to moniter specific topic:(Change topic03 to your topic name)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic03 --from-beginning
#open new terminal to bypass moniter consumer status
Image(filename='04_image/98.png')
Of course, you can write a python code to moniter consumer. You need to open terminal and start jupyter notebook. Paste following code as a seperate pyhon code, for instance consuper.ipyybn. But kafak-python provide consumer model to moniter output.
from cassandra.cluster import Cluster
consumer = KafkaConsumer('topic03',\
group_id=None, \
bootstrap_servers=['localhost:9092'] , \
auto_offset_reset='earliest' )
consumer_time=[]
for msg in consumer:
consumer_time.append(msg.value)
print("value=%s"%(msg.value))
cluster = Cluster()
session = cluster.connect('test_keyspace')
insert_query = "insert into keyspace01.table01 (stattion,status) values(%s,%s)"
#switch to database keyspace01
session.execute("USE keyspace01")
session.execute(insert_query,(str(consumer_time[2]),str(consumer_time[3])))
value=b'1596320049' value=b'1596320049' value=b'Hello world!' value=b'2020-08-02 02:23:45'
--------------------------------------------------------------------------- KeyboardInterrupt Traceback (most recent call last) <ipython-input-99-f82a796d7eaf> in <module> 7 auto_offset_reset='earliest' ) 8 consumer_time=[] ----> 9 for msg in consumer: 10 consumer_time.append(msg.value) 11 print("value=%s"%(msg.value)) ~/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py in __next__(self) 1179 return self.next_v1() 1180 else: -> 1181 return self.next_v2() 1182 1183 def next_v2(self): ~/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py in next_v2(self) 1187 self._iterator = self._message_generator_v2() 1188 try: -> 1189 return next(self._iterator) 1190 except StopIteration: 1191 self._iterator = None ~/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py in _message_generator_v2(self) 1104 def _message_generator_v2(self): 1105 timeout_ms = 1000 * (self._consumer_timeout - time.time()) -> 1106 record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) 1107 for tp, records in six.iteritems(record_map): 1108 # Generators are stateful, and it is possible that the tp / records ~/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py in poll(self, timeout_ms, max_records, update_offsets) 643 remaining = timeout_ms 644 while True: --> 645 records = self._poll_once(remaining, max_records, update_offsets=update_offsets) 646 if records: 647 return records ~/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py in _poll_once(self, timeout_ms, max_records, update_offsets) 690 691 timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll() * 1000) --> 692 self._client.poll(timeout_ms=timeout_ms) 693 # after the long poll, we should check whether the group needs to rebalance 694 # prior to returning data so that the group can stabilize faster ~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in poll(self, timeout_ms, future) 596 timeout = max(0, timeout) # avoid negative timeouts 597 --> 598 self._poll(timeout / 1000) 599 600 # called without the lock to avoid deadlock potential ~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in _poll(self, timeout) 628 629 start_select = time.time() --> 630 ready = self._selector.select(timeout) 631 end_select = time.time() 632 if self._sensors: ~/anaconda3/lib/python3.7/selectors.py in select(self, timeout) 466 ready = [] 467 try: --> 468 fd_event_list = self._selector.poll(timeout, max_ev) 469 except InterruptedError: 470 return ready KeyboardInterrupt:
from cassandra.cluster import Cluster
cluster = Cluster()
rows = session.execute('SELECT stattion, status FROM table01')
for row in rows:
print(row.stattion, row.status)
bob 56 b'1596320049' b'1596320049' 72 72 sam 56 david Beld
import socket
import json
import requests
import urllib3
import re
from kafka import BrokerConnection
from kafka.protocol.admin import *
from kafka import KafkaProducer
from kafka.errors import KafkaError
from kafka import KafkaConsumer
from bs4 import BeautifulSoup
#use for unix time stamp conversion
from datetime import datetime
#transform time zone
from pytz import timezone
from cassandra.cluster import Cluster
#we use multiprocess to solve consuemr listening and transform data to cassandra issues
from multiprocessing import Process, Queue, Pool
def group_name():
"""
This function is used to present every group name in kafka
"""
#initial a kafka broker connection with localhost and fix port 9092.
#socket.AP_INET represent local ip addresee
bc = BrokerConnection('localhost', 9092, socket.AF_INET)
bc.connect_blocking()
list_groups_request = ListGroupsRequest_v1()
future = bc.send(list_groups_request)
while not future.is_done:
for resp, f in bc.recv():
f.success(resp)
for group in future.value.groups:
print(group)
def scrapy():
"""
scrapy data from CitiBike system. Here is link:https://gbfs.citibikenyc.com/gbfs/en/station_status.json
The data we get is json format, we need transfrom it into unicidoe form
Input
NOTHING
Output:
dictionary
"""
#use requests to get raw html result and restore it into r_01
r_01 = requests.get('https://gbfs.citibikenyc.com/gbfs/en/station_status.json')
return r_01
def json_transform(jsonstring):
"""
Have to use this function to start iteration. I don't why
"""
msg = json.loads(jsonstring)
return msg
def data_transform():
#r_01.text will be Unicode form and get json string by json_transofr fucntion. data_02 will be dictornary
#
r_01 = requests.get('https://gbfs.citibikenyc.com/gbfs/en/station_status.json')
#################this is function reference######################
data_02 = json_transform(r_01.text)
####################################################################
#use regex to get very specific data we want
#this regex extract start with 'station_id' and with two and more number \d{2,}
data_id = re.findall(r"\'station_id\': \'\d{2,}\'", str(data_02))
#this regex extract time stamp of each recodes
data_time = re.findall(r"\'last_reported\': \d+", str(data_02))
#if we get the correct data, we should have same len records
if len(data_id)==len(data_time):
print("we have {} records".format(len(data_id)))
else:
print("Error, len of data_id and data_time is different.")
return data_id, data_time
def time_transform():
"""
Orignial time is UNIX time, we need to transform to human readable time
Output:
list = ['2020-08-20 15:07:04','2020-08-20 15:14:52']
"""
#################this is function reference######################
data_id, data_time = data_transform()
####################################################################
#only extract value in stataion_ID. We don't need the prefix "station_id"
data_ID = re.findall("\d{2,}", str(data_id))
#only extract time value in last_reports
data_timestamp = re.findall("\d+", str(data_time))
#create a new list to restore standard timestamp fomation
data_daytime=[]
#timezone is not working well for now
eastern = timezone('US/Eastern')
for n in range(0,len(data_timestamp)):
data_int = int(data_timestamp[n])
data_utc = datetime.utcfromtimestamp(data_int)
#change from UTC to ETC
data_daytime.append(eastern.localize(data_utc).strftime('%Y-%m-%d %H:%M:%S'))
#data_daytime = datetime.fromtimestamp(timestamp)
print(f"we have successfully transfrom time like this {data_daytime[3]}")
return data_daytime
def kafka_producer(q):
"""
I don't konw why this is not working. I have to move this section out and put it into __main__ function
"""
#################this is function reference######################
data_daytime = time_transform()
####################################################################
#this is very simple version, you can change version and put it into a loop to output all time stamp
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
#we cerate a topic named 'topic03', and provide this topic with daytime. In here, we only provide one time at a time.
#If this is steam, you can use for loop to send all time
producer.send('topic03', data_daytime[3].encode('utf-8'))
return producer
def kafka_consumer_queue(queue):
consumer = KafkaConsumer('topic03',\
group_id=None, \
bootstrap_servers=['localhost:9092'] , \
auto_offset_reset='earliest' )
consumer_time=[]
for msg in consumer:
consumer_time.append(msg.value)
print("value=%s"%(msg.value))
queue.put(consumer_time)
def kafka_consumer():
consumer = KafkaConsumer('topic03',\
group_id=None, \
bootstrap_servers=['localhost:9092'] , \
auto_offset_reset='earliest' )
consumer_time=[]
for msg in consumer:
consumer_time.append(msg.value)
print("value=%s"%(msg.value))
def cassandra(consumer_time):
cluster = Cluster()
session = cluster.connect('test_keyspace')
insert_query = "insert into keyspace01.table01 (stattion,status) values(%s,%s)"
#switch to database keyspace01
session.execute("USE keyspace01")
session.execute(insert_query, (str(consumer_time[-2]),str(consumer_time[-1])))
#session.execute("USE keyspace01")
#ouput the current
rows = session.execute('SELECT stattion, status FROM table01')
for row in rows:
print(row.stattion, row.status)
if __name__=='__main__':
data_daytime = time_transform()
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
producer.send('topic03', data_daytime[3].encode('utf-8'))
we have 1127 records we have successfully transfrom time like this 2020-08-31 12:14:08
kafka_consumer()
value=b'2020-08-31 03:24:04' value=b'2020-08-31 03:24:04' value=b'2020-08-31 03:38:28' value=b'2020-08-31 03:38:28' value=b'2020-08-31 11:51:28' value=b'2020-08-31 12:14:08'
--------------------------------------------------------------------------- KeyboardInterrupt Traceback (most recent call last) <ipython-input-183-8f6f989b2e98> in <module> ----> 1 kafka_consumer() <ipython-input-182-c9632403131e> in kafka_consumer() 150 auto_offset_reset='earliest' ) 151 consumer_time=[] --> 152 for msg in consumer: 153 consumer_time.append(msg.value) 154 print("value=%s"%(msg.value)) ~/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py in __next__(self) 1179 return self.next_v1() 1180 else: -> 1181 return self.next_v2() 1182 1183 def next_v2(self): ~/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py in next_v2(self) 1187 self._iterator = self._message_generator_v2() 1188 try: -> 1189 return next(self._iterator) 1190 except StopIteration: 1191 self._iterator = None ~/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py in _message_generator_v2(self) 1104 def _message_generator_v2(self): 1105 timeout_ms = 1000 * (self._consumer_timeout - time.time()) -> 1106 record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) 1107 for tp, records in six.iteritems(record_map): 1108 # Generators are stateful, and it is possible that the tp / records ~/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py in poll(self, timeout_ms, max_records, update_offsets) 643 remaining = timeout_ms 644 while True: --> 645 records = self._poll_once(remaining, max_records, update_offsets=update_offsets) 646 if records: 647 return records ~/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py in _poll_once(self, timeout_ms, max_records, update_offsets) 690 691 timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll() * 1000) --> 692 self._client.poll(timeout_ms=timeout_ms) 693 # after the long poll, we should check whether the group needs to rebalance 694 # prior to returning data so that the group can stabilize faster ~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in poll(self, timeout_ms, future) 596 timeout = max(0, timeout) # avoid negative timeouts 597 --> 598 self._poll(timeout / 1000) 599 600 # called without the lock to avoid deadlock potential ~/anaconda3/lib/python3.7/site-packages/kafka/client_async.py in _poll(self, timeout) 628 629 start_select = time.time() --> 630 ready = self._selector.select(timeout) 631 end_select = time.time() 632 if self._sensors: ~/anaconda3/lib/python3.7/selectors.py in select(self, timeout) 466 ready = [] 467 try: --> 468 fd_event_list = self._selector.poll(timeout, max_ev) 469 except InterruptedError: 470 return ready KeyboardInterrupt:
cassandra(consumer_time)
b'2020-08-31 03:38:28' b'2020-08-31 11:51:28'
import logging
import sys
logger = logging.getLogger('kafka')
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)
#This part is for Windows
from multiprocessing import Process
import os
def run_proc(name):
print("Run child process %s (%s)..." % (name, os.getpid()))
if __name__=='__main__':
print("Parent process %s" %os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start')
p.start()
p.join()
print("Child process end.")
Parent process 43032 Child process will start Child process end.
kinesis-agent python code """
if name=='main': count =0 while count<3:
#get json tpye file from citibike resource
r = requests.get('https://gbfs.citibikenyc.com/gbfs/en/station_status.json')
#store it in data
#soup = BeautifulSoup(text, 'html.parser')
test = "data.log"
dest_data = time.strftime("/var/log/cadabra/%Y%m%d-%H%M%S.log")
data_3 = re.sub(r'(?<=[.,])(?=[^\s])', r'\n', r.text)
with open(test,'w') as f:
n = f.write(data_3)
f.close()
count+=1
time.sleep(2)
"""
consumer group_id = None
from kafka import BrokerConnection from kafka.protocol.admin import * import socket
bc = BrokerConnection('localhost', 9092, socket.AF_INET) bc.connect_blocking()
list_groups_request = ListGroupsRequest_v1()
future = bc.send(list_groups_request) while not future.is_done: for resp, f in bc.recv(): f.success(resp)
for group in future.value.groups: print(group)